{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Forest Inference Library (FIL)\n",
    "The forest inference library is used to load saved forest models of xgboost, lightgbm and perform inference on them. It can be used to perform both classification and regression. In this notebook, we'll begin by fitting a model with XGBoost and saving it. We'll then load the saved model into FIL and use it to infer on new data.\n",
    "\n",
    "FIL works in the same way with lightgbm model as well.\n",
    "\n",
    "The model accepts both numpy arrays and cuDF dataframes. In order to convert your dataset to cudf format please read the cudf documentation on https://docs.rapids.ai/api/cudf/stable. \n",
    "\n",
    "For additional information on the forest inference library please refer to the documentation on https://docs.rapids.ai/api/cuml/stable/api.html#forest-inferencing"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import cupy\n",
    "import os\n",
    "\n",
    "from cuml.testing.utils import array_equal\n",
    "from cuml.internals.import_utils import has_xgboost\n",
    "\n",
    "from cuml.datasets import make_classification\n",
    "from cuml.metrics import accuracy_score\n",
    "from cuml.model_selection import train_test_split\n",
    "    \n",
    "from cuml import ForestInference"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Check for xgboost\n",
    "Checks if xgboost is present, if not then it throws an error."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "if has_xgboost():\n",
    "    import xgboost as xgb\n",
    "else:\n",
    "    raise ImportError(\"Please install xgboost using the conda package,\"\n",
    "                      \"e.g.: conda install -c conda-forge xgboost\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define parameters"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# synthetic data size\n",
    "n_rows = 10000\n",
    "n_columns = 100\n",
    "n_categories = 2\n",
    "random_state = cupy.random.RandomState(43210)\n",
    "\n",
    "# fraction of data used for model training\n",
    "train_size = 0.8\n",
    "\n",
    "# trained model output filename\n",
    "model_path = 'xgb.model'\n",
    "\n",
    "# num of iterations for which xgboost is trained\n",
    "num_rounds = 100\n",
    "\n",
    "# maximum tree depth in each training round\n",
    "max_depth = 20"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generate data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create the dataset\n",
    "X, y = make_classification(\n",
    "    n_samples=n_rows,\n",
    "    n_features=n_columns,\n",
    "    n_informative=int(n_columns/5),\n",
    "    n_classes=n_categories,\n",
    "    random_state=42\n",
    ")\n",
    "\n",
    "# convert the dataset to float32\n",
    "X = X.astype('float32')\n",
    "y = y.astype('float32')\n",
    "\n",
    "# split the dataset into training and validation splits\n",
    "X_train, X_validation, y_train, y_validation = train_test_split(X, y, train_size=0.8)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train helper function\n",
    "Defines a simple function that trains the XGBoost model and returns the trained model.\n",
    "\n",
    "For additional information on the xgboost library please refer to the documentation on : \n",
    "https://xgboost.readthedocs.io/en/latest/parameter.html"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def train_xgboost_model(\n",
    "    X_train, \n",
    "    y_train,\n",
    "    model_path='xgb.model',\n",
    "    num_rounds=100, \n",
    "    max_depth=20\n",
    "):\n",
    "    \n",
    "    # set the xgboost model parameters\n",
    "    params = {\n",
    "        'verbosity': 0, \n",
    "        'eval_metric':'error',\n",
    "        'objective':'binary:logistic',\n",
    "        'max_depth': max_depth,\n",
    "        'tree_method': 'gpu_hist'\n",
    "    }\n",
    "    \n",
    "    # convert training data into DMatrix\n",
    "    dtrain = xgb.DMatrix(X_train, label=y_train)\n",
    "    \n",
    "    # train the xgboost model\n",
    "    trained_model = xgb.train(params, dtrain, num_rounds)\n",
    "\n",
    "    # save the trained xgboost model\n",
    "    trained_model.save_model(model_path)\n",
    "\n",
    "    return trained_model"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Predict helper function\n",
    "Uses the trained xgboost model to perform prediction and return the labels."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def predict_xgboost_model(X_validation, y_validation, xgb_model):\n",
    "\n",
    "    # predict using the xgboost model\n",
    "    dvalidation = xgb.DMatrix(X_validation, label=y_validation)\n",
    "    predictions = xgb_model.predict(dvalidation)\n",
    "\n",
    "    # convert the predicted values from xgboost into class labels\n",
    "    predictions = cupy.around(predictions)\n",
    "    \n",
    "    return predictions"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Train and Predict the model\n",
    "Invoke the function to train the model and get predictions so that we can validate them."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "# train the xgboost model\n",
    "xgboost_model = train_xgboost_model(\n",
    "    X_train, \n",
    "    y_train, \n",
    "    model_path,\n",
    "    num_rounds,\n",
    "    max_depth\n",
    ")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "# test the xgboost model\n",
    "trained_model_preds = predict_xgboost_model(\n",
    "    X_validation,\n",
    "    y_validation,\n",
    "    xgboost_model\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Load Forest Inference Library (FIL)\n",
    "\n",
    "The load function of the ForestInference class accepts the following parameters:\n",
    "\n",
    "       filename : str\n",
    "           Path to saved model file in a treelite-compatible format\n",
    "           (See https://treelite.readthedocs.io/en/latest/treelite-api.html\n",
    "        output_class : bool\n",
    "           If true, return a 1 or 0 depending on whether the raw prediction\n",
    "           exceeds the threshold. If False, just return the raw prediction.\n",
    "        threshold : float\n",
    "           Cutoff value above which a prediction is set to 1.0\n",
    "           Only used if the model is classification and output_class is True\n",
    "        algo : string name of the algo from (from algo_t enum)\n",
    "             'NAIVE' - simple inference using shared memory\n",
    "             'TREE_REORG' - similar to naive but trees rearranged to be more\n",
    "                              coalescing-friendly\n",
    "             'BATCH_TREE_REORG' - similar to TREE_REORG but predicting\n",
    "                                    multiple rows per thread block\n",
    "        model_type : str\n",
    "            Format of saved treelite model to load.\n",
    "            Can be 'xgboost', 'lightgbm'"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Loaded the saved model\n",
    "Use FIL to load the saved xgboost model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fil_model = ForestInference.load(\n",
    "    filename=model_path,\n",
    "    algo='BATCH_TREE_REORG',\n",
    "    output_class=True,\n",
    "    threshold=0.50,\n",
    "    model_type='xgboost_ubj'\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Predict using FIL"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "# perform prediction on the model loaded from path\n",
    "fil_preds = fil_model.predict(X_validation)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Evaluate results\n",
    "\n",
    "Verify the predictions for the original and FIL model match."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(\"The shape of predictions obtained from xgboost : \", (trained_model_preds).shape)\n",
    "print(\"The shape of predictions obtained from FIL : \", (fil_preds).shape)\n",
    "print(\"Are the predictions for xgboost and FIL the same : \",  array_equal(trained_model_preds, fil_preds))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Distributed FIL with Dask"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now lets demonstrate how we can use FIL and Dask together to run parallel inference on multiple-GPUs while leveraging our trained model. \n",
    "\n",
    "Below we will:\n",
    "1. **create a Dask cluster** with n_GPU workers,\n",
    "\n",
    "2. **generate synthetic data** and partition it evenly among the workers,\n",
    "    \n",
    "3. **load FIL model** on each worker,\n",
    "\n",
    "4. and **run parallel FIL .predict()** on each worker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "*Optional Kernel Restart*"
   ]
  },
  {
   "cell_type": "raw",
   "metadata": {},
   "source": [
    "import IPython\n",
    "IPython.Application.instance().kernel.do_shutdown(restart=True)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Dask Imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask_cuda import LocalCUDACluster\n",
    "from distributed import Client, wait, get_worker\n",
    "\n",
    "import dask.dataframe\n",
    "import dask.array\n",
    "import dask_cudf\n",
    "\n",
    "from cuml import ForestInference\n",
    "import time"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a LocalCUDACluster"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Note that we'll be partitioning the data equally among the workers. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster = LocalCUDACluster()\n",
    "client = Client(cluster)\n",
    "\n",
    "workers = client.has_what().keys()\n",
    "n_workers = len(workers)\n",
    "n_partitions = n_workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define size of synthetic data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "rows = 1_000_000\n",
    "cols = 100"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Generate synthetic query/inference data"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Next we will generate data on the CPU as a dask array, then move it into GPU memory as a dask.dataframe and ultimately convert it into a dask_cudf.dataframe so that it can be used in the upstream FIL predict."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "x = dask.array.random.random(\n",
    "    size=(rows, cols), \n",
    "    chunks=(rows//n_partitions, cols)\n",
    ").astype('float32')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = dask.dataframe.from_array(x).to_backend(\"cudf\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Persist data in GPU memory\n",
    "We can optionally persist our generated data (see [Persist documentation](https://docs.dask.org/en/latest/dataframe-best-practices.html?highlight=persist#persist-intelligently)), so that our lazy dataframe starts to be executed and saved in memory."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = df.persist()\n",
    "wait(df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Pre-load FIL model on each worker"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Before we run inference on our distributed dataset let's first load the FIL model (trained by XGBoost above), onto each worker.\n",
    "\n",
    "Here we'll leverage the worker's local storage which will persist after the function/task completes.\n",
    "\n",
    "For more see the [Dask worker documentation on storage](https://distributed.dask.org/en/latest/worker.html#storing-data)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def worker_init(dask_worker, model_file='xgb.model'):\n",
    "   dask_worker.data[\"fil_model\"] = ForestInference.load(\n",
    "       filename=model_file,\n",
    "       algo='BATCH_TREE_REORG',\n",
    "       output_class=True,\n",
    "       threshold=0.50,\n",
    "       model_type='xgboost_ubj'\n",
    "    )"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "client.run(worker_init)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Distributed FIL Predict on persisted data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def predict(input_df):\n",
    "   worker = get_worker()\n",
    "   return worker.data[\"fil_model\"].predict(input_df)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Lets map the predict call to each of our partitions (i.e., the dask_cudf.dataframe chunks that we distributed among the workers )."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "distributed_predictions = df.map_partitions(predict, meta=\"float\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "tic = time.perf_counter()\n",
    "distributed_predictions.compute()\n",
    "toc = time.perf_counter()\n",
    "\n",
    "fil_inference_time = toc-tic"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Summarize the performance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "total_samples = len(df)\n",
    "print(f' {total_samples:,} inferences in {fil_inference_time:.5f} seconds'\n",
    "      f' -- {int(total_samples/fil_inference_time):,} inferences per second ')"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
